package com.atguigu.flink.wordcount;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Created by Smexy on 2023/11/7
 *
    流批一体：
        本质使用流处理的api，通过参数的配置实现流或批的计算效果。

        无界流：  不支持批处理的效果。
        有界流:   支持批处理
                 支持流处理。


        默认的运行模式：  Stream模式
            可选：  Batch模式
                  AUTOMATIC模式，当你所有的数据源都是有界流，就设置为批模式。只要有一个是无界流，就设置为流模式。


 */
public class Demo4_StreamBatchOne
{
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //进行配置
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //有界流
        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                 new TextLineInputFormat(), new Path("data/words.txt"))
                                             .build();
        //DataStreamSource<String> dataStreamSource = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "wc");

        //无界流
        DataStreamSource<String> dataStreamSource1 = environment.socketTextStream("hadoop102", 8888);
        DataStreamSource<String> dataStreamSource2 = environment.socketTextStream("hadoop102", 8889);

        dataStreamSource1.
            flatMap(new MyWordCountFlatmapFunction2())
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>()
            {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            })
            .sum(1)
            .print();

        dataStreamSource2.
            flatMap(new MyWordCountFlatmapFunction2())
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>()
            {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            })
            .sum(1)
            .printToErr();

        environment.execute();



    }

    private static class MyWordCountFlatmapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>>
    {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] words = value.split(" ");
            Arrays.stream(words)
                  .forEach(
                      word -> out.collect(Tuple2.of(word,1))
                  );
        }
    }
}
